

source('cleaning_utils.r')
setnames(names, 'first_name', 'first_only')

pull_files = read_csv('l2/l2_inventory.csv')

vnames = c(
  "LALVOTERID",
  "Voters_Active",
  "Voters_FirstName",
  "Voters_MiddleName",
  "Voters_LastName",
  "Voters_NameSuffix",
  "Residence_Addresses_AddressLine",
  "Residence_Addresses_City",
  "Residence_Addresses_State",
  "Residence_Addresses_Zip",
  "Residence_Addresses_CensusTract",
  "Residence_Addresses_CensusBlockGroup",
  "Residence_Addresses_CensusBlock",
  "Residence_Addresses_Latitude",
  "Residence_Addresses_Longitude",
  "Mailing_Addresses_AddressLine",
  "Mailing_Addresses_City",
  "Mailing_Addresses_State",
  "Mailing_Addresses_Zip",
  "Voters_Gender",
  "Voters_Age",
  "Voters_BirthDate",
  "Parties_Description",
  "EthnicGroups_EthnicGroup1Desc",
  "CountyEthnic_LALEthnicCode"  ,
  "CountyEthnic_Description",
  "Voters_CalculatedRegDate",
  "Voters_OfficialRegDate",
  "US_Congressional_District",
  "AddressDistricts_Change_Changed_CD",
  "Voters_FIPS",
  "CommercialData_Education",
  "CommercialData_EstHomeValue",
  "CommercialData_EstimatedIncome",
  "CommercialData_EstimatedIncomeAmount",
  "CommercialData_EstimatedHHIncome",
  "CommercialData_EstimatedHHIncomeAmount",
  "CommercialData_EstimatedAreaMedianHHIncome",
  "CommercialData_HomePurchaseDate",
  "CommercialData_HomePurchasePrice",
  "CommercialData_LandValue",
  "CommercialData_LikelyUnion",
  "CommercialData_AreaMedianHousingValue",
  "CommercialData_Occupation",
  "CommercialData_OccupationIndustry",
  "CommercialData_PropertyType",
  "CommercialData_StateIncomeDecile",
  "CommercialDataLL_Home_Owner_Or_Renter",
  "CommercialDataLL_Net_Worth",
  "CommercialDataLL_HH_Net_Worth",
  "CommercialDataLL_Veteran",
  "FECDonors_AvgDonation",
  "FECDonors_AvgDonation_Range",
  "FECDonors_LastDonationDate",
  "FECDonors_NumberOfDonations",
  "FECDonors_PrimaryRecipientOfContributions",
  "FECDonors_TotalDonationsAmount",
  "FECDonors_TotalDonationsAmt_Range",
  "General_2016-11-08",
  "General_2014-11-04",
  "General_2012-11-06",
  "General_2008-11-04",
  "Primary_2008",
  "Primary_2012",
  "Primary_2016",
  "Primary_2020",
  "General_2008",
  "General_2012",
  "General_2018",
  "General_2020",
  "CommercialData_MedianHousingValue",
  "CommercialData_EstimatedMedianIncome",
  "2011_NEW_Congressional_District"
)

split_modify = function(dt) {
  dt = dt %>%
    mutate(
      Voters_FirstName = str_replace_na(enc2utf8(Voters_FirstName), ''),
      Voters_MiddleName = str_replace_na(enc2utf8(Voters_MiddleName), ''),
      Voters_LastName = str_replace_na(enc2utf8(Voters_LastName), ''),
      Voters_NameSuffix = str_replace_na(enc2utf8(Voters_NameSuffix), '')
    ) %>%
    mutate(
      first_m = str_c(Voters_FirstName, substring(Voters_MiddleName, 1, 1), sep = ' ') %>% str_squish(),
      last_sf = str_c(Voters_LastName, Voters_NameSuffix, sep = ' ') %>% str_squish()
    ) %>%
    mutate(
      first_only = str_to_lower(Voters_FirstName),
      last_only = str_to_lower(Voters_LastName),
      first_m = str_to_lower(first_m),
      last_sf = str_to_lower(last_sf),
      Residence_Addresses_AddressLine = str_to_lower(enc2utf8(Residence_Addresses_AddressLine)),
      Residence_Addresses_City = str_to_lower(enc2utf8(Residence_Addresses_City)),
      Residence_Addresses_State = str_to_lower(enc2utf8(Residence_Addresses_State)),
      Mailing_Addresses_AddressLine = str_to_lower(enc2utf8(Mailing_Addresses_AddressLine)),
      Mailing_Addresses_City = str_to_lower(enc2utf8(Mailing_Addresses_City)),
      Mailing_Addresses_State = str_to_lower(enc2utf8(Mailing_Addresses_State))
    ) %>%
    mutate(
      Mailing_Addresses_AddressLine = case_when(
        Residence_Addresses_AddressLine == Mailing_Addresses_AddressLine ~ NA_character_,
        T ~ Mailing_Addresses_AddressLine
      )
    ) %>%
    as.data.table()
  
  dt[, residence_po_num := po_box_parser(str_remove_all(Residence_Addresses_AddressLine, '\\?'))]
  dt[, mailing_po_num := po_box_parser(str_remove_all(Mailing_Addresses_AddressLine, '\\?'))]
  
  dt[, Residence_Addresses_Zip := normalize_zip(enc2utf8(Residence_Addresses_Zip))]
  dt[, Mailing_Addresses_Zip := normalize_zip(enc2utf8(Mailing_Addresses_Zip))]
  
  dt = dt %>%
    mutate(
      Residence_Addresses_AddressLine = case_when(
        !is.na(residence_po_num) ~ NA_character_,
        T ~ Residence_Addresses_AddressLine
      ),
      Mailing_Addresses_AddressLine = case_when(
        !is.na(mailing_po_num) ~ NA_character_,
        T ~ Mailing_Addresses_AddressLine
      )
    ) %>%
    left_join(names %>%
                mutate(gender = str_to_lower(gender)),
              by = c('first_only' = 'first_name')) %>%
    mutate(gender = case_when(is.na(gender) ~ 1, gender == 'm' ~ 0, gender == 'f' ~ 2)) %>%
    as.data.table()
  
  dt[, Residence_Addresses_AddressLine := usps_address(Residence_Addresses_AddressLine)]
  dt[, Mailing_Addresses_AddressLine := usps_address(Mailing_Addresses_AddressLine)]
  
  return(dt)
  
}

selector = function(d) {
  d %>%
    select(
      LALVOTERID,
      address_type,
      gender,
      first_only,
      last_only,
      first_m,
      last_sf,
      row_n,
      year,
      po_num,
      address,
      city,
      state,
      zip,
      tract,
      block_group,
      block,
      starts_with('CommercialData'),
      starts_with('FECDonors'),
      starts_with('General'),
      starts_with('Primary'),
      any_of(
        c(
          '2011_NEW_Congressional_District',
          'US_Congressional_District',
          'Voters_Gender',
          'Voters_Age',
          'Voters_BirthDate',
          'Parties_Description',
          'EthnicGroups_EthnicGroup1Desc',
          'CountyEthnic_LALEthnicCode',
          'CountyEthnic_Description',
          'AddressDistricts_Change_Changed_CD',
          'Voters_FIPS',
          'Voters_CalculatedRegDate',
          'Voters_OfficialRegDate',
          "Residence_Addresses_Latitude",
          "Residence_Addresses_Longitude"
        )
      )
    ) %>%
    return()
}

parse_file = function(f, s, y){ # file, state, year
  
  decompression = system2("unzip", args = c("-o", f, "-d", 'sandbox/'), stdout = T)
  print(decompression)
  
  this_l2 = list.files('sandbox/', pattern = '.tab', full.names = T)
  
  these_vnames = data.table::fread(this_l2, sep = '\t', nrows = 0)
  keep_vnames = vnames[vnames %in% names(these_vnames)]
  
  this_l2_df = data.table::fread(
    this_l2,
    sep = "\t",
    select = keep_vnames,
    na.strings = '',
    colClasses = c(
      'Residence_Addresses_CensusTract' = 'character',
      'Residence_Addresses_CensusBlockGroup' = 'character',
      'Residence_Addresses_CensusBlock' = 'character',
      'Residence_Addresses_Zip' = 'character',
      'Residence_Addresses_ZipPlus4' = 'character',
      'Mailing_Addresses_Zip' = 'character',
      'Mailing_Addresses_ZipPlus4' = 'character'
    )
  )
  
  this_l2_df[, row_n := .I]
  this_l2_df[, year := y]
  this_l2_df[, core := sample(1:(detectCores() - 1), nrow(this_l2_df), replace = T)]
  
  this_l2_df = split(this_l2_df, by = 'core')
  this_l2_df = mclapply(this_l2_df, split_modify, mc.cores = detectCores() - 1)
  this_l2_df = rbindlist(this_l2_df)
  
  print('pulling residential and mailing...')
  # residential
  this_res = this_l2_df %>%
    filter((!is.na(residence_po_num)) | (!is.na(Residence_Addresses_AddressLine))) %>%
    mutate(address_type = 'r') %>%
    rename(
      po_num = residence_po_num,
      address = Residence_Addresses_AddressLine,
      city = Residence_Addresses_City,
      state = Residence_Addresses_State,
      zip = Residence_Addresses_Zip,
      tract = Residence_Addresses_CensusTract,
      block_group = Residence_Addresses_CensusBlockGroup,
      block = Residence_Addresses_CensusBlock,
    ) %>%
    selector() %>%
    as.data.table()
  
  # mailing
  this_mail = this_l2_df %>%
    filter((!is.na(mailing_po_num)) |
             (!is.na(Mailing_Addresses_AddressLine))) %>%
    mutate(address_type = 'm') %>%
    rename(
      po_num = mailing_po_num,
      address = Mailing_Addresses_AddressLine,
      city = Mailing_Addresses_City,
      state = Mailing_Addresses_State,
      zip = Mailing_Addresses_Zip,
      tract = Residence_Addresses_CensusTract,
      block_group = Residence_Addresses_CensusBlockGroup,
      block = Residence_Addresses_CensusBlock,
    ) %>%
    selector() %>%
    as.data.table()
  
  if (nrow(this_mail) > 0) {
    out = rbind(arrow_table(this_res), arrow_table(this_mail)) %>%
      mutate(emmid = str_c('l2', year, tolower(s), row_n, address_type, sep = '-')) %>%
      as.data.table()
  } else{
    out = this_res %>%
      mutate(emmid = str_c('l2', year, tolower(s), row_n, address_type, sep = '-')) %>%
      as.data.table()
  }
  
  out = merge(out, names, all.x = T, by = 'first_only')
  out[, gender:= fcase(gender %chin% 'u' | is.na(gender), 1L, gender %chin% 'm', 0L, gender %chin% 'f', 2L, default = NA)]
  
  write_dataset(
    out,
    glue('data/l2/year={y}/state_file={tolower(s)}'),
    partitioning = 'state',
    max_rows_per_file = 500000
  )
  
  file.remove(list.files('sandbox/', full.names = T))
}

for (i in 1:nrow(pull_files)) {
  print(glue('Parsing L2 file {pull_files$file[i]}...'))
  parse_file(pull_files$file[i], pull_files$state[i], pull_files$cycle[i])
}
